/*
Copyright (c) 2023 China Mobile Information Technology Co., Ltd
OpenGauss Operator is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
         http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/

package client

import (
	"bytes"
	"context"
	"fmt"
	"io"
	v13 "k8s.io/api/apps/v1"
	v12 "k8s.io/api/batch/v1"
	v1 "k8s.io/api/core/v1"
	"k8s.io/apimachinery/pkg/api/errors"
	metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/types"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/remotecommand"
	"k8s.io/klog/v2"
	gsv1 "openGauss-operator/api/v1"
	"openGauss-operator/internal/util"
	"sigs.k8s.io/controller-runtime/pkg/client"
	"strings"
)

type K8sClient struct {
	client client.Client
	config *rest.Config
}

func NewK8sClient(client client.Client, config *rest.Config) K8sClient {
	return K8sClient{client: client, config: config}
}

func (k *K8sClient) GetOpenGaussCluster(namespacedName types.NamespacedName) (*gsv1.OpenGaussCluster, error) {
	gs := &gsv1.OpenGaussCluster{}

	err := k.client.Get(context.TODO(), namespacedName, gs)

	return gs, err
}

func (k *K8sClient) GetDaemonSet(namespace, name string) (*v13.DaemonSet, error) {
	daemonset := &v13.DaemonSet{}
	err := k.client.Get(context.TODO(), types.NamespacedName{
		Namespace: namespace,
		Name:      name,
	}, daemonset)

	if err != nil {
		return nil, err
	}

	return daemonset, nil
}

func (k *K8sClient) GetOpenGaussBackupRecovery(namespacedName types.NamespacedName) (*gsv1.OpenGaussBackupRecovery, error) {

	br := &gsv1.OpenGaussBackupRecovery{}

	err := k.client.Get(context.TODO(), namespacedName, br)

	return br, err
}

func (k *K8sClient) GetPvList(labels map[string]string) ([]*v1.PersistentVolume, error) {
	pvList := &v1.PersistentVolumeList{}
	listOpts := []client.ListOption{
		client.MatchingLabels(labels),
	}
	if err := k.client.List(context.TODO(), pvList, listOpts...); err != nil {
		return nil, err
	}

	var pvs []*v1.PersistentVolume
	pvItems := pvList.Items
	for i := range pvItems {
		pvs = append(pvs, &pvItems[i])
	}

	return pvs, nil
}

func (k *K8sClient) GetConfigMap(configMapName string, namespace string) (*v1.ConfigMap, error) {

	configMap := &v1.ConfigMap{}

	objectKey := client.ObjectKey{
		Namespace: namespace,
		Name:      configMapName,
	}

	if err := k.client.Get(context.TODO(), objectKey, configMap); err != nil {
		return nil, err
	}

	return configMap, nil
}

func (k *K8sClient) GetPvcList(namespace string, labels map[string]string) ([]*v1.PersistentVolumeClaim, error) {
	pvcList := &v1.PersistentVolumeClaimList{}
	listOpts := []client.ListOption{
		client.InNamespace(namespace),
		client.MatchingLabels(labels),
	}
	if err := k.client.List(context.TODO(), pvcList, listOpts...); err != nil {
		return nil, err
	}

	var pvcArray []*v1.PersistentVolumeClaim
	pvcItems := pvcList.Items
	for i := range pvcItems {
		pvcArray = append(pvcArray, &pvcItems[i])
	}

	return pvcArray, nil
}

func (k *K8sClient) Create(obj client.Object) error {

	if err := k.client.Create(context.TODO(), obj); err != nil {
		return err
	}

	return nil
}

func (k *K8sClient) GetServiceList(namespace string, labels map[string]string) ([]*v1.Service, error) {
	serviceList := &v1.ServiceList{}
	listOpts := []client.ListOption{
		client.InNamespace(namespace),
		client.MatchingLabels(labels),
	}
	if err := k.client.List(context.TODO(), serviceList, listOpts...); err != nil {
		return nil, err
	}

	var serviceArray []*v1.Service
	serviceItems := serviceList.Items
	for i := range serviceItems {
		serviceArray = append(serviceArray, &serviceItems[i])
	}

	return serviceArray, nil
}

func (k *K8sClient) GetPodList(namespace string, labels map[string]string) ([]*v1.Pod, error) {
	podList := &v1.PodList{}
	listOpts := []client.ListOption{
		client.InNamespace(namespace),
		client.MatchingLabels(labels),
	}
	if err := k.client.List(context.TODO(), podList, listOpts...); err != nil {
		return nil, err
	}

	var podArray []*v1.Pod
	podItems := podList.Items
	for i := range podItems {
		podArray = append(podArray, &podItems[i])
	}

	return podArray, nil
}

func (k *K8sClient) GetPod(namespace string, name string) (*v1.Pod, error) {

	objectKey := client.ObjectKey{
		Namespace: namespace,
		Name:      name,
	}

	pod := &v1.Pod{}
	if err := k.client.Get(context.TODO(), objectKey, pod); err != nil {
		return nil, err
	}

	return pod, nil
}

func (k *K8sClient) GetPv(pvName string, namespace string) (*v1.PersistentVolume, error) {

	pv := &v1.PersistentVolume{}

	objectKey := client.ObjectKey{
		Namespace: namespace,
		Name:      pvName,
	}

	if err := k.client.Get(context.TODO(), objectKey, pv); err != nil {
		return nil, err
	}

	return pv, nil
}

func (k *K8sClient) PatchPv(modifyPv *v1.PersistentVolume, pv *v1.PersistentVolume) error {

	patch := client.MergeFrom(pv.DeepCopyObject().(client.Object))

	if err := k.client.Patch(context.TODO(), modifyPv, patch); err != nil {
		return err
	}

	return nil
}

func (k *K8sClient) Update(obj client.Object) error {

	if err := k.client.Update(context.TODO(), obj); err != nil {
		return err
	}

	return nil
}

func (k *K8sClient) UpdateOpenGaussClusterStatus(modifyGs *gsv1.OpenGaussCluster) error {

	if err := k.client.Status().Update(context.TODO(), modifyGs); err != nil {
		return err
	}

	return nil
}

func (k *K8sClient) UpdateOpenGaussBackupRecoveryObject(modifyBr *gsv1.OpenGaussBackupRecovery, oldBr *gsv1.OpenGaussBackupRecovery) error {

	patch := client.MergeFrom(oldBr.DeepCopyObject().(client.Object))

	if err := k.client.Patch(context.TODO(), modifyBr, patch); err != nil {
		return err
	}

	return nil
}

func (k *K8sClient) UpdateOpenGaussClusterObject(modifyOgc *gsv1.OpenGaussCluster, oldOgc *gsv1.OpenGaussCluster) error {

	patch := client.MergeFrom(oldOgc.DeepCopyObject().(client.Object))

	if err := k.client.Patch(context.TODO(), modifyOgc, patch); err != nil {
		return err
	}

	return nil
}

func (k *K8sClient) GetCronJob(cronJobName string, namespace string) (*v12.CronJob, error) {

	cronJob := &v12.CronJob{}

	objectKey := client.ObjectKey{
		Namespace: namespace,
		Name:      cronJobName,
	}

	if err := k.client.Get(context.TODO(), objectKey, cronJob); err != nil {
		return nil, err
	}

	return cronJob, nil
}

func (k *K8sClient) GetJob(jobName string, namespace string) (*v12.Job, error) {

	job := &v12.Job{}

	objectKey := client.ObjectKey{
		Namespace: namespace,
		Name:      jobName,
	}

	if err := k.client.Get(context.TODO(), objectKey, job); err != nil {
		return nil, err
	}

	return job, nil
}

func (k *K8sClient) DeleteCronJob(cronJob *v12.CronJob) error {

	err := k.client.Delete(context.TODO(), cronJob)

	if err != nil {
		return err

	}
	return nil
}

func (k *K8sClient) DeleteJob(job *v12.Job) error {

	// 级联删除job创建的pod
	propagationPolicy := metaV1.DeletePropagationBackground
	deleteOptions := client.DeleteOptions{
		PropagationPolicy: &propagationPolicy,
	}

	err := k.client.Delete(context.TODO(), job, &deleteOptions)

	if err != nil {
		return err

	}
	return nil
}

func (k *K8sClient) PatchCronJob(modifyCj *v12.CronJob, oldCj *v12.CronJob) error {

	patch := client.MergeFrom(oldCj.DeepCopyObject().(client.Object))

	if err := k.client.Patch(context.TODO(), modifyCj, patch); err != nil {
		return err
	}

	return nil
}

func (k *K8sClient) DeletePod(podName, namespace string) error {

	pod := &v1.Pod{
		ObjectMeta: metaV1.ObjectMeta{
			Namespace: namespace,
			Name:      podName,
		},
	}

	opts := client.DeleteOptions{}

	if err := k.client.Delete(context.TODO(), pod, &opts); err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}
	klog.Info(fmt.Sprintf("Delete pod %s in namespace %s", podName, namespace))
	return nil
}

func (k *K8sClient) DeletePvc(pvcName, namespace string) error {

	pvc := &v1.PersistentVolumeClaim{
		ObjectMeta: metaV1.ObjectMeta{
			Namespace: namespace,
			Name:      pvcName,
		},
	}

	opts := client.DeleteOptions{}

	if err := k.client.Delete(context.TODO(), pvc, &opts); err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}
	klog.Info(fmt.Sprintf("Delete pvcName %s in namespace %s", pvcName, namespace))
	return nil
}

func (k *K8sClient) GetService(namespace string, name string) (*v1.Service, error) {
	objectKey := client.ObjectKey{
		Namespace: namespace,
		Name:      name,
	}
	svc := &v1.Service{}
	if err := k.client.Get(context.TODO(), objectKey, svc); err != nil {
		return nil, err
	}
	return svc, nil
}

func (k *K8sClient) DeletePv(pvName, namespace string) error {

	pv := &v1.PersistentVolume{
		ObjectMeta: metaV1.ObjectMeta{
			Namespace: namespace,
			Name:      pvName,
		},
	}

	opts := client.DeleteOptions{}

	if err := k.client.Delete(context.TODO(), pv, &opts); err != nil {
		if errors.IsNotFound(err) {
			return nil
		}
		return err
	}
	klog.Info(fmt.Sprintf("Delete pvName %s in namespace %s", pvName, namespace))
	return nil
}

func (k *K8sClient) ExecCommand(namespace string, command []string, targetName string) (string, error) {
	stdout, stderr, err := k.ExecToPodThroughAPI(command, util.ContainerNameGaussDb, targetName, namespace, nil)
	if err != nil || (stderr != "") || strings.Contains(stdout, "[ERR]") {
		err := fmt.Errorf("gsClusterInstance: %v -- infoCmd: %v\n -- stdout: %v\n -- stderr: %v\n -- error: %v", targetName, command, stdout, stderr, err)
		klog.Errorf(err.Error())
		return "", err
	}
	stdout = strings.TrimRight(stdout, "\n")
	return stdout, err
}

func (k *K8sClient) GetPodLog(namespace, podName string) (string, error) {
	clientSet, err := kubernetes.NewForConfig(k.config)
	if err != nil {
		return "", util.ReturnError("Create clientSet Error", err)
	}

	// 获取最后20行日志
	var lines int64 = util.PodLogTailLine

	//如果不指定TailLines的话，会获取pod从运行到当前的所有日志
	req := clientSet.CoreV1().Pods(namespace).GetLogs(podName, &v1.PodLogOptions{TailLines: &lines})

	podLogs, err := req.Stream(context.TODO())
	if err != nil {
		return "", util.ReturnError("Get pod log Error", err)
	}
	defer func() {
		if podLogs != nil {
			podLogs.Close()
		}
	}()

	// 日志处理
	buf := new(bytes.Buffer)
	_, err = io.Copy(buf, podLogs)
	if err != nil {
		return "", util.ReturnError("Copy pod logs Error", err)
	}
	str := buf.String()
	return str, nil
}

func (k *K8sClient) ExecToPodThroughAPI(command []string, containerName, podName, namespace string, stdin io.Reader) (string, string, error) {
	klog.V(4).Info("exec To Pod Through API %v/%v -- %v", namespace, podName, command)
	clientset, err := kubernetes.NewForConfig(k.config)
	req := clientset.CoreV1().RESTClient().Post().
		Resource("pods").
		Namespace(namespace).
		Name(podName).
		SubResource("exec")
	scheme := runtime.NewScheme()
	if err := v1.AddToScheme(scheme); err != nil {
		return "", "", fmt.Errorf("error adding to scheme: %v", err)
	}

	parameterCodec := runtime.NewParameterCodec(scheme)
	req.VersionedParams(&v1.PodExecOptions{
		Command:   command,
		Container: containerName,
		Stdin:     false,
		Stdout:    true,
		Stderr:    true,
		TTY:       false,
	}, parameterCodec)

	if false {
		fmt.Println("Request URL:", req.URL().String())
	}

	exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
	if err != nil {
		return "", "", fmt.Errorf("error while createing executor: %v", err)
	}

	var stdout, stderr bytes.Buffer
	err = exec.Stream(remotecommand.StreamOptions{
		Stdin:  stdin,
		Stdout: &stdout,
		Stderr: &stderr,
		Tty:    false,
	})
	if err != nil {
		return stdout.String(), stderr.String(), fmt.Errorf("error is Stream: %v", err)
	}

	return stdout.String(), stderr.String(), nil
}

func (k *K8sClient) GetPrimaryPodName(gs *gsv1.OpenGaussCluster) (string, error) {

	clusterMap, err := k.ClusterMsgConvertToMap(gs)
	if err != nil {
		return "", err
	}

	for key, value := range clusterMap {
		if value == "Primary" && !strings.Contains(key, "cms") {
			return key, nil
		}
	}
	return "", util.ReturnError("Primary pod name is nil", nil)

}

// 获取集群状态
// Normal：表示数据库实例可用，且数据有冗余备份。所有进程都在运行，主备关系正常。
// Degraded：表示数据库实例可用，但数据没有冗余备份。
// Unavailable：表示数据库实例不可用。
func (k *K8sClient) GetClusterState(namespace, targetPodName string) (string, error) {
	clusterState, err := k.ExecCommand(namespace, util.Command(util.CommandGetClusterState), targetPodName)
	return clusterState, err
}

// 获取cm_ctl query -Cv信息
func (k *K8sClient) GetClusterMsg(gs *gsv1.OpenGaussCluster) (string, error) {
	conditions := gs.Status.Conditions
	if conditions == nil {
		return "", util.ReturnError("conditions is nil", nil)
	}

	for i := range conditions {
		stdout, stderr, err := k.ExecToPodThroughAPI(util.Command(util.CommandGetClusterMsg), util.ContainerNameGaussDb, conditions[i].Name, gs.Namespace, nil)
		if err != nil {
			klog.Info(fmt.Sprintf("Execute command exception through pod %s ", conditions[i].Name), err)
			continue
		}

		if stderr != "" || strings.Contains(stdout, "[ERR]") {
			klog.Info(fmt.Sprintf("Node %s initialization not completed", conditions[i].Name))
			continue
		}

		stdout = strings.TrimRight(stdout, "\n")
		return stdout, nil
	}

	return "", nil
}

func (k *K8sClient) ClusterMsgConvertToMap(gs *gsv1.OpenGaussCluster) (map[string]string, error) {
	clusterMsgMap := map[string]string{}
	msg, err := k.GetClusterMsg(gs)
	if err != nil {
		klog.Error("cm_ctl query -Cv error ：", err)
		return clusterMsgMap, err
	}

	msgArr := strings.Split(msg, "\n")
	for _, msg := range msgArr {
		// CMServer State
		cmsName := gs.Name + "-cms"
		if strings.Contains(msg, cmsName) {
			clusterMsgMap[strings.Fields(msg)[1]] = strings.Fields(msg)[3]
		}

		// Cluster State
		if strings.Contains(msg, "cluster_state") {
			state := strings.Split(strings.Replace(msg, " ", "", -1), ":")[1]
			clusterMsgMap["cluster_state"] = state
		}

		// Datanode State
		dnName := gs.Name + "-0"
		if strings.Contains(msg, dnName) {
			split := strings.Split(msg, "|")
			for _, nodeMsg := range split {
				clusterMsgMap[strings.Fields(nodeMsg)[1]] = strings.Fields(nodeMsg)[4]
			}
		}
	}
	return clusterMsgMap, err
}
